/
channel_opts.go
308 lines (254 loc) · 9.6 KB
/
channel_opts.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
// Copyright (c) 2015 Uber Technologies, Inc.
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package testutils
import (
"flag"
"net"
"testing"
"time"
"github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/tos"
"go.uber.org/atomic"
"golang.org/x/net/context"
)
var connectionLog = flag.Bool("connectionLog", false, "Enables connection logging in tests")
// Default service names for the test channels.
const (
DefaultServerName = "testService"
DefaultClientName = "testService-client"
)
// ChannelOpts contains options to create a test channel using WithServer
type ChannelOpts struct {
tchannel.ChannelOptions
// ServiceName defaults to DefaultServerName or DefaultClientName.
ServiceName string
// LogVerification contains options for controlling the log verification.
LogVerification LogVerification
// DisableRelay disables the relay interposed between clients/servers.
// By default, all tests are run with a relay interposed.
DisableRelay bool
// DisableServer disables creation of the TChannel server.
// This is typically only used in relay tests when a custom server is required.
DisableServer bool
// OnlyRelay instructs TestServer the test must only be run with a relay.
OnlyRelay bool
// RunCount is the number of times the test should be run. Zero or
// negative values are treated as a single run.
RunCount int
// postFns is a list of functions that are run after the test.
// They are run even if the test fails.
postFns []func()
}
// LogVerification contains options to control the log verification.
type LogVerification struct {
Disabled bool
Filters []LogFilter
}
// LogFilter is a single substring match that can be ignored.
type LogFilter struct {
// Filter specifies the substring match to search
// for in the log message to skip raising an error.
Filter string
// Count is the maximum number of allowed warn+ logs matching
// Filter before errors are raised.
Count uint
// FieldFilters specifies expected substring matches for fields.
FieldFilters map[string]string
}
// Copy copies the channel options (so that they can be safely modified).
func (o *ChannelOpts) Copy() *ChannelOpts {
if o == nil {
return NewOpts()
}
copiedOpts := *o
return &copiedOpts
}
// SetServiceName sets ServiceName.
func (o *ChannelOpts) SetServiceName(svcName string) *ChannelOpts {
o.ServiceName = svcName
return o
}
// SetProcessName sets the ProcessName in ChannelOptions.
func (o *ChannelOpts) SetProcessName(processName string) *ChannelOpts {
o.ProcessName = processName
return o
}
// SetStatsReporter sets StatsReporter in ChannelOptions.
func (o *ChannelOpts) SetStatsReporter(statsReporter tchannel.StatsReporter) *ChannelOpts {
o.StatsReporter = statsReporter
return o
}
// SetFramePool sets FramePool in DefaultConnectionOptions.
func (o *ChannelOpts) SetFramePool(framePool tchannel.FramePool) *ChannelOpts {
o.DefaultConnectionOptions.FramePool = framePool
return o
}
// SetHealthChecks sets HealthChecks in DefaultConnectionOptions.
func (o *ChannelOpts) SetHealthChecks(healthChecks tchannel.HealthCheckOptions) *ChannelOpts {
o.DefaultConnectionOptions.HealthChecks = healthChecks
return o
}
// SetSendBufferSize sets the SendBufferSize in DefaultConnectionOptions.
func (o *ChannelOpts) SetSendBufferSize(bufSize int) *ChannelOpts {
o.DefaultConnectionOptions.SendBufferSize = bufSize
return o
}
// SetSendBufferSizeOverrides sets the SendBufferOverrides in DefaultConnectionOptions.
func (o *ChannelOpts) SetSendBufferSizeOverrides(overrides []tchannel.SendBufferSizeOverride) *ChannelOpts {
o.DefaultConnectionOptions.SendBufferSizeOverrides = overrides
return o
}
// SetTosPriority set TosPriority in DefaultConnectionOptions.
func (o *ChannelOpts) SetTosPriority(tosPriority tos.ToS) *ChannelOpts {
o.DefaultConnectionOptions.TosPriority = tosPriority
return o
}
// SetChecksumType sets the ChecksumType in DefaultConnectionOptions.
func (o *ChannelOpts) SetChecksumType(checksumType tchannel.ChecksumType) *ChannelOpts {
o.DefaultConnectionOptions.ChecksumType = checksumType
return o
}
// SetTimeNow sets TimeNow in ChannelOptions.
func (o *ChannelOpts) SetTimeNow(timeNow func() time.Time) *ChannelOpts {
o.TimeNow = timeNow
return o
}
// SetTimeTicker sets TimeTicker in ChannelOptions.
func (o *ChannelOpts) SetTimeTicker(timeTicker func(d time.Duration) *time.Ticker) *ChannelOpts {
o.TimeTicker = timeTicker
return o
}
// DisableLogVerification disables log verification for this channel.
func (o *ChannelOpts) DisableLogVerification() *ChannelOpts {
o.LogVerification.Disabled = true
return o
}
// NoRelay disables running the test with a relay interposed.
func (o *ChannelOpts) NoRelay() *ChannelOpts {
o.DisableRelay = true
return o
}
// SetRelayOnly instructs TestServer to only run with a relay in front of this channel.
func (o *ChannelOpts) SetRelayOnly() *ChannelOpts {
o.OnlyRelay = true
return o
}
// SetDisableServer disables creation of the TChannel server.
// This is typically only used in relay tests when a custom server is required.
func (o *ChannelOpts) SetDisableServer() *ChannelOpts {
o.DisableServer = true
return o
}
// SetRunCount sets the number of times run the test.
func (o *ChannelOpts) SetRunCount(n int) *ChannelOpts {
o.RunCount = n
return o
}
// AddLogFilter sets an allowed filter for warning/error logs and sets
// the maximum number of times that log can occur.
func (o *ChannelOpts) AddLogFilter(filter string, maxCount uint, fields ...string) *ChannelOpts {
fieldFilters := make(map[string]string)
for i := 0; i < len(fields); i += 2 {
fieldFilters[fields[i]] = fields[i+1]
}
o.LogVerification.Filters = append(o.LogVerification.Filters, LogFilter{
Filter: filter,
Count: maxCount,
FieldFilters: fieldFilters,
})
return o
}
func (o *ChannelOpts) addPostFn(f func()) {
o.postFns = append(o.postFns, f)
}
// SetRelayHost sets the channel's RelayHost, which enables relaying.
func (o *ChannelOpts) SetRelayHost(rh tchannel.RelayHost) *ChannelOpts {
o.ChannelOptions.RelayHost = rh
return o
}
// SetRelayLocal sets the channel's relay local handlers for service names
// that should be handled by the relay channel itself.
func (o *ChannelOpts) SetRelayLocal(relayLocal ...string) *ChannelOpts {
o.ChannelOptions.RelayLocalHandlers = relayLocal
return o
}
// SetRelayMaxTimeout sets the maximum allowable timeout for relayed calls.
func (o *ChannelOpts) SetRelayMaxTimeout(d time.Duration) *ChannelOpts {
o.ChannelOptions.RelayMaxTimeout = d
return o
}
// SetRelayMaxConnectionTimeout sets the maximum timeout for connection attempts.
func (o *ChannelOpts) SetRelayMaxConnectionTimeout(d time.Duration) *ChannelOpts {
o.ChannelOptions.RelayMaxConnectionTimeout = d
return o
}
// SetRelayMaxTombs sets the maximum number of tombs tracked in the relayer.
func (o *ChannelOpts) SetRelayMaxTombs(maxTombs uint64) *ChannelOpts {
o.ChannelOptions.RelayMaxTombs = maxTombs
return o
}
// SetOnPeerStatusChanged sets the callback for channel status change
// noficiations.
func (o *ChannelOpts) SetOnPeerStatusChanged(f func(*tchannel.Peer)) *ChannelOpts {
o.ChannelOptions.OnPeerStatusChanged = f
return o
}
// SetMaxIdleTime sets a threshold after which idle connections will
// automatically get dropped. See idle_sweep.go for more details.
func (o *ChannelOpts) SetMaxIdleTime(d time.Duration) *ChannelOpts {
o.ChannelOptions.MaxIdleTime = d
return o
}
// SetIdleCheckInterval sets the frequency of the periodic poller that removes
// stale connections from the channel.
func (o *ChannelOpts) SetIdleCheckInterval(d time.Duration) *ChannelOpts {
o.ChannelOptions.IdleCheckInterval = d
return o
}
// SetDialer sets the dialer used for outbound connections
func (o *ChannelOpts) SetDialer(f func(context.Context, string, string) (net.Conn, error)) *ChannelOpts {
o.ChannelOptions.Dialer = f
return o
}
// SetConnContext sets the connection's ConnContext function
func (o *ChannelOpts) SetConnContext(f func(context.Context, net.Conn) context.Context) *ChannelOpts {
o.ConnContext = f
return o
}
func defaultString(v string, defaultValue string) string {
if v == "" {
return defaultValue
}
return v
}
// NewOpts returns a new ChannelOpts that can be used in a chained fashion.
func NewOpts() *ChannelOpts { return &ChannelOpts{} }
// DefaultOpts will return opts if opts is non-nil, NewOpts otherwise.
func DefaultOpts(opts *ChannelOpts) *ChannelOpts {
if opts == nil {
return NewOpts()
}
return opts
}
// WrapLogger wraps the given logger with extra verification.
func (v *LogVerification) WrapLogger(t testing.TB, l tchannel.Logger) tchannel.Logger {
return errorLogger{l, t, v, &errorLoggerState{
matchCount: make([]atomic.Uint32, len(v.Filters)),
}}
}