-
Notifications
You must be signed in to change notification settings - Fork 28
/
model.go
299 lines (261 loc) · 12.6 KB
/
model.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package flow defines interfaces for the management of authenticated bidirectional byte Flows.
// TODO(suharshs): This is a work in progress and can change without notice.
//
// A Flow represents a flow-controlled authenticated byte stream between two endpoints.
//
// A Manager manages the creation of Flows and the re-use of network connections.
// A Manager can Dial out to a specific remote end to receive a Flow to that end.
// A Manager can Listen on multiple protocols and addresses. Listening
// causes the Manager to accept flows from any of the specified protocols and addresses.
// Additionally a Manager will accept incoming Dialed out connections for their lifetime.
package flow
import (
"io"
"net"
"time"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/rpc/version"
"v.io/v23/security"
)
// Manager is the interface for managing the creation of Flows.
type Manager interface {
// Listen causes the Manager to accept flows from the provided protocol and address.
// Listen may be called muliple times.
// If err != nil, creating a net.Listener to the address failed.
// Otherwise, if error == nil, the returned chan will block until the
// net.Listener fails while accepting connections. The caller may then
// choose to relisten on the protocol and address.
Listen(ctx *context.T, protocol, address string) (<-chan struct{}, error)
// ProxyListen causes the Manager to accept flows from the specified endpoint.
// The endpoint must correspond to a vanadium proxy.
// If error != nil, establishing a connection to the Proxy failed.
// Otherwise, if error == nil, the returned chan will block until the
// connection to the proxy endpoint fails. The caller may then choose to retry
// the connection.
// name is a identifier of the proxy. It can be used to access errors
// in ListenStatus.ProxyErrors.
ProxyListen(ctx *context.T, name string, endpoint naming.Endpoint) (<-chan struct{}, error)
// Status returns the current ListenStatus of the manager.
Status() ListenStatus
// StopListening stops listening on all currently listening addresses and proxies.
// All outstanding calls to Accept will return an error.
// It is safe to begin listening again.
StopListening(ctx *context.T)
// Accept blocks until a new Flow has been initiated by a remote process.
// Flows are accepted from addresses that the Manager is listening on,
// including outgoing dialed connections.
//
// For example:
// err := m.Listen(ctx, "tcp", ":0")
// for {
// flow, err := m.Accept(ctx)
// // process flow
// }
//
// can be used to accept Flows initiated by remote processes.
Accept(ctx *context.T) (Flow, error)
// Dial creates a Flow to the provided remote endpoint, using 'auth' to
// determine the blessings that will be sent to the remote end.
//
// If the manager has a non-null RoutingID, the Manager will re-use connections
// by Listening on Dialed connections for the lifetime of the Dialed connection.
//
// channelTimeout specifies the duration we are willing to wait before determining
// that connections managed by this Manager are unhealthy and should be
// closed.
Dial(ctx *context.T, remote naming.Endpoint, auth PeerAuthorizer, channelTimeout time.Duration) (Flow, error)
// DialSideChannelCached returns a new flow over an existing cached
// connection that is not factored in when deciding the underlying
// connection's idleness, etc.
DialSideChannelCached(ctx *context.T, remote naming.Endpoint, auth PeerAuthorizer, underlying ManagedConn, channelTimeout time.Duration) (Flow, error)
// DialCached creates a Flow to the provided remote endpoint using only cached
// connections from previous Listen or Dial calls.
// If no cached connection exists, an error will be returned.
//
// 'auth' is used to determine the blessings that will be sent to the remote end.
//
// channelTimeout specifies the duration we are willing to wait before determining
// that connections managed by this Manager are unhealthy and should be
// closed.
DialCached(ctx *context.T, remote naming.Endpoint, auth PeerAuthorizer, channelTimeout time.Duration) (Flow, error)
// RoutingID returns the naming.Routing of the flow.Manager.
// If the RoutingID of the manager is naming.NullRoutingID, the manager can
// only be used to Dial outgoing calls.
RoutingID() naming.RoutingID
// Closed returns a channel that remains open for the lifetime of the Manager
// object. Once the channel is closed any operations on the Manager will
// necessarily fail.
Closed() <-chan struct{}
}
type ListenStatus struct {
// ListeningEndpoints contains the endpoints that the Manager has explicitly
// called Listen on. The Manager will accept new flows on these endpoints.
// Proxied endpoints are included in the results.
// If the Manager is not listening on any endpoints, an endpoint with the
// Manager's RoutingID will be returned for use in bidirectional RPC.
// Returned endpoints all have the Manager's unique RoutingID.
Endpoints []naming.Endpoint
// ListenErrors contains the set of errors encountered when listening on
// the network or accepting a connection from a listener. Entries are keyed by
// the protocol, address specified in the ListenSpec.
ListenErrors map[struct{ Protocol, Address string }]error
// ProxyErrors contains the set of errors encountered when listening on
// proxies. Entries are keyed by the name provided to ProxyListen. If the
// entry exists and is nil, the ProxyListen was successful.
ProxyErrors map[string]error
// Dirty will be closed if a status change occurs. Callers should
// requery manager.Status() to get the fresh server status.
// Dirty will be nil once the manager is Closed.
Dirty <-chan struct{}
}
// PeerAuthorizer is the interface used in performing security authorization.
type PeerAuthorizer interface {
// AuthorizePeer authorizes the remote blessings and returns the remote
// blessing names, and those names rejected.
AuthorizePeer(ctx *context.T,
localEndpoint naming.Endpoint,
remoteEndpoint naming.Endpoint,
remoteBlessings security.Blessings,
remoteDischarges map[string]security.Discharge,
) (peerBlessingNames []string, rejectedPeerNames []security.RejectedBlessing, _ error)
// BlessingsForPeer returns the blessings and discharges that should be
// presented to the remote end with peerBlessingNames.
BlessingsForPeer(ctx *context.T, peerBlessingNames []string) (
security.Blessings, map[string]security.Discharge, error)
}
// ManagedConn represents the connection onto which this flow is multiplexed.
// Since this ManagedConn may be shared between many flows it wouldn't be safe
// to read and write to it directly. We just provide some metadata.
type ManagedConn interface {
// LocalEndpoint returns the local vanadium Endpoint.
LocalEndpoint() naming.Endpoint
// RemoteEndpoint returns the remote vanadium Endpoint.
RemoteEndpoint() naming.Endpoint
// RemoteBlessings returns the remote end's blessings presented during connection
// establishment. These may be different than those presented during flow creation.
RemoteBlessings() security.Blessings
// LocalBlessings returns the local end's blessings presented during connection
// establishment. These may be different than those presented during flow creation.
LocalBlessings() security.Blessings
// RemoteDischarges returns the discharges presented by the remote end of the
// connection during authentication.
//
// Discharges are organized in a map keyed by the discharge-identifier.
RemoteDischarges() map[string]security.Discharge
// LocalDischarges returns the discharges presented by the local end of the
// connection during authentication.
//
// Discharges are organized in a map keyed by the discharge-identifier.
LocalDischarges() map[string]security.Discharge
// CommonVersion returns the RPCVersion negotiated between the local and remote endpoints.
CommonVersion() version.RPCVersion
// RTT returns the last round-trip-time of the health-check sent on this connection.
// A zero duration is returned if a RTT is unavailable.
RTT() time.Duration
// LastUsed returns the last time the connection had bytes read or written on it.
LastUsed() time.Time
// Closed returns a channel that remains open until the connection has been closed.
Closed() <-chan struct{}
}
// PinnedConn represents a connection that is pinned to the managers cache.
// PinnedConn's will attempt to be automatically reconnected when the underlying
// connection is closed.
type PinnedConn interface {
// Conn returns an underlying ManagedConn for the PinnedConn. The returned
// ManagedConn may change due to reconnections. The returned value will always
// be non-nil.
Conn() ManagedConn
// Unpin unpins the conn from the Manager's cache, making it more susceptible
// for cache eviction. Any connection related opts passed to PinConnection
// (e.g ChannelTimeout) will become invalid when Unpin is called.
// It is idempotent.
Unpin()
}
// MsgWriter defines and interface for writing messages.
type MsgWriter interface {
// WriteMsg is like Write, but allows writing more than one buffer at a time.
// The data in each buffer is written sequentially onto the flow. Returns the
// number of bytes written. WriteMsg must return a non-nil error if it writes
// less than the total number of bytes from all buffers.
WriteMsg(data ...[]byte) (int, error)
}
// MsgReader defines an interface for reading messages.
type MsgReader interface {
// ReadMsg is like read, but it reads bytes in chunks. Depending on the
// implementation the batch boundaries might or might not be significant.
ReadMsg() ([]byte, error)
}
// MsgReadWriteCloser combines the MsgReader and MsgWriter interfaces and
// adds the Close method.
type MsgReadWriteCloser interface {
MsgWriter
MsgReader
// Close closes the MsgReadWriteCloser. After Close is called all writes will
// return an error, but reads of already queued data may succeed.
Close() error
}
// Flow is the interface for a flow-controlled channel multiplexed over a Conn.
type Flow interface {
io.ReadWriter
MsgReadWriteCloser
// WriteMsgAndClose performs WriteMsg and then closes the flow.
WriteMsgAndClose(data ...[]byte) (int, error)
// SetDeadlineContext sets the context associated with the flow.
// It derives a context from the passed in context and the passed in deadline.
// Typically this is used to set state that is only available after
// the flow is connected, such as the language of the request.
SetDeadlineContext(ctx *context.T, deadline time.Time) *context.T
// LocalEndpoint returns the local vanadium Endpoint.
LocalEndpoint() naming.Endpoint
// RemoteEndpoint returns the remote vanadium Endpoint.
RemoteEndpoint() naming.Endpoint
// RemoteAddr returns the remote address of the peer.
RemoteAddr() net.Addr
// LocalBlessings returns the blessings presented by the local end of the flow
// during authentication.
LocalBlessings() security.Blessings
// RemoteBlessings returns the blessings presented by the remote end of the
// flow during authentication.
RemoteBlessings() security.Blessings
// LocalDischarges returns the discharges presented by the local end of the
// flow during authentication.
//
// Discharges are organized in a map keyed by the discharge-identifier.
LocalDischarges() map[string]security.Discharge
// RemoteDischarges returns the discharges presented by the remote end of the
// flow during authentication.
//
// Discharges are organized in a map keyed by the discharge-identifier.
RemoteDischarges() map[string]security.Discharge
// Conn returns the connection the flow is multiplexed on.
Conn() ManagedConn
// Closed returns a channel that remains open until the flow has been closed or
// the ctx to the Dial or Accept call used to create the flow has been cancelled.
Closed() <-chan struct{}
// DisableFragmentation disables fragmentation of the []byte. This is used by
// xproxyd.
DisableFragmentation()
}
// Conn is the connection onto which flows are mulitplexed.
// It contains information of the Conn's local network address. Other infomation
// is not available until the authentication handshake is complete.
type Conn interface {
MsgReadWriteCloser
// LocalAddr returns the Conn's network address.
LocalAddr() net.Addr
// RemoteAddr returns the Conn's network address.
RemoteAddr() net.Addr
}
// Listener provides methods for accepting new Conns.
type Listener interface {
// Accept waits for and are returns new Conns.
Accept(ctx *context.T) (Conn, error)
// Addr returns Listener's network address.
Addr() net.Addr
// Close closes the Listener. After Close is called all Accept calls will fail.
Close() error
}