/
servers.scala
397 lines (346 loc) · 15.5 KB
/
servers.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
package com.twitter.finatra.thrift
import com.google.inject.Module
import com.twitter.app.Flag
import com.twitter.conversions.DurationOps._
import com.twitter.finagle.service.NilService
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.ListeningServer
import com.twitter.finagle.NullServer
import com.twitter.finagle.Service
import com.twitter.finagle.ThriftMux
import com.twitter.finagle.tracing.Tracer
import com.twitter.finatra.thrift.modules.ExceptionManagerModule
import com.twitter.finatra.thrift.modules.ThriftResponseClassifierModule
import com.twitter.finatra.thrift.response.ThriftResponseClassifier
import com.twitter.finatra.thrift.routing.JavaThriftRouter
import com.twitter.finatra.thrift.routing.ThriftRouter
import com.twitter.inject.annotations.Lifecycle
import com.twitter.inject.modules.StackTransformerModule
import com.twitter.inject.server.AbstractTwitterServer
import com.twitter.inject.server.PortUtils
import com.twitter.inject.server.TwitterServer
import com.twitter.util.Await
import com.twitter.util.Duration
import java.net.InetSocketAddress
private object ThriftServerTrait {
/**
* Sentinel used to indicate no thrift server announcement.
*/
val NoThriftAnnouncement: String = ""
}
/**
* A basic ThriftServer implemented by a {{{com.twitter.finagle.Service[Array[Byte], Array[Byte]]}}}.
*
* @note Java users are encouraged to use [[AbstractThriftServerTrait]] instead.
*/
trait ThriftServerTrait extends TwitterServer {
/** Add Framework Modules */
addFrameworkModules(
ExceptionManagerModule,
StackTransformerModule,
thriftResponseClassifierModule)
/**
* Default external Thrift port used as the [[Flag]] default value for [[thriftPortFlag]]. This
* can be overridden to provide a different default programmatically when a flag value cannot be
* passed. The format of this value is expected to be a String in the form of ":port".
*
* In general, users should prefer setting the [[thriftPortFlag]] [[Flag]] value.
*
* @see [[com.twitter.finatra.thrift.ThriftServerTrait.thriftPortFlag]]
*/
protected def defaultThriftPort: String = ":9999"
/**
* External Thrift port [[Flag]]. The default value is specified by [[defaultThriftPort]] which
* can be overridden to provide a different default.
*
* @note the default value is ":9999" as defined by [[defaultThriftPort]].
* @note the format of this flag is expected to be a String in the form of ":port".
* @see [[com.twitter.finatra.thrift.ThriftServerTrait.defaultThriftPort]]
* @see [[https://twitter.github.io/finatra/user-guide/getting-started/flags.html#passing-flag-values-as-command-line-arguments]]
*/
private val thriftPortFlag: Flag[String] =
flag("thrift.port", defaultThriftPort, "External Thrift server port")
/**
* Default shutdown timeout used as the [[Flag]] default value for [[thriftShutdownTimeoutFlag]].
* This represents the deadline for the closing of this server which can be overridden to provide
* a different default programmatically when a flag value cannot be passed.
*
* In general, users should prefer setting the [[thriftShutdownTimeoutFlag]] [[Flag]] value.
*
* @note the value is used to denote a delta "from now", that is this value is applied as:
* `server.close(shutdownTimeoutDuration.fromNow())`
* @see [[com.twitter.finatra.thrift.ThriftServerTrait.thriftShutdownTimeoutFlag]]
* @see [[com.twitter.util.Closable.close(deadline: Time)]]
* @see [[https://github.com/twitter/util/blob/b0a5d06269b9526b4408239ce1441b2a213dd0df/util-core/src/main/scala/com/twitter/util/Duration.scala#L436]]
*/
protected def defaultThriftShutdownTimeout: Duration = 1.minute
/**
* Shutdown timeout [[Flag]]. The default value is specified by [[defaultThriftShutdownTimeout]]
* which can be overridden to provide a different default.
*
* @note the default value is "1.minute" as defined by [[defaultThriftShutdownTimeout]].
* @note the format of this flag is expected to be a String which is parsable into a [[com.twitter.util.Duration]].
* @see [[com.twitter.finatra.thrift.ThriftServerTrait.defaultThriftShutdownTimeout]]
* @see [[https://twitter.github.io/finatra/user-guide/getting-started/flags.html#passing-flag-values-as-command-line-arguments]]
*/
private val thriftShutdownTimeoutFlag: Flag[Duration] = flag(
"thrift.shutdown.time",
defaultThriftShutdownTimeout,
"Maximum amount of time to wait for pending requests to complete on shutdown"
)
/**
* Default server name for the external Thrift interface used as the [[Flag]] default value for
* [[thriftServerNameFlag]]. This can be overridden to provide a different default programmatically
* when a flag value cannot be passed.
*
* In general, users should prefer setting the [[thriftServerNameFlag]] [[Flag]] value.
*
* @see [[com.twitter.finatra.thrift.ThriftServerTrait.thriftServerNameFlag]]
*/
protected def defaultThriftServerName: String = "thrift"
/**
* Server name for the external Thrift interface [[Flag]]. The default value is specified by
* [[defaultThriftServerName]] which can be overridden to provide a different default.
*
* @note the default value is "thrift" as defined by [[defaultThriftServerName]].
* @see [[com.twitter.finatra.thrift.ThriftServerTrait.defaultThriftServerName]]
* @see [[https://twitter.github.io/finatra/user-guide/getting-started/flags.html#passing-flag-values-as-command-line-arguments]]
*/
private val thriftServerNameFlag: Flag[String] =
flag("thrift.name", defaultThriftServerName, "Thrift server name")
/**
* Default server announcement String used as the [[Flag]] default value for [[thriftAnnounceFlag]].
* This can be overridden to provide a different default programmatically when a flag value cannot
* be passed. An empty String value is an indication to not perform any announcement of the server.
*
* In general, users should prefer setting the [[thriftAnnounceFlag]] [[Flag]] value.
*
* @see [[com.twitter.finagle.ListeningServer.announce(addr: String)]]
*/
protected def defaultThriftAnnouncement: String = ThriftServerTrait.NoThriftAnnouncement
/**
* Server announcement String [[Flag]]. The default value is specified by [[defaultThriftAnnouncement]]
* which can be overridden to provide a different default. Setting an empty String is an indication
* to not perform any announcement of the server.
*
* @note the default value is "No Announcement" (empty String) as defined by [[defaultThriftAnnouncement]].
* @see [[com.twitter.finagle.ListeningServer.announce(addr: String)]]
* @see [[com.twitter.finatra.thrift.ThriftServerTrait.defaultThriftAnnouncement]]
* @see [[https://twitter.github.io/finatra/user-guide/getting-started/flags.html#passing-flag-values-as-command-line-arguments]]
*/
private val thriftAnnounceFlag: Flag[String] =
flag[String](
"thrift.announce",
defaultThriftAnnouncement,
"Address for announcing Thrift server. Empty string indicates no announcement."
)
/* Private Mutable State */
private var thriftServer: ListeningServer = NullServer
/* Abstract */
/**
* The Finagle `Service[Array[Byte], Array[Byte]]` to serve on the configured [[ListeningServer]].
*
* Users must override with an implementation to serve a `Service[Array[Byte], Array[Byte]]`.
*/
protected def thriftService: Service[Array[Byte], Array[Byte]]
/**
* The address to which the underlying [[ListeningServer]] is bound
*
* @note this returns [[None]] before the [[postWarmup()]] lifecycle phase is done or if the
* server fails to start up.
*/
protected final def boundAddress: Option[InetSocketAddress] =
if (thriftServer == NullServer)
None
else
Some(thriftServer.boundAddress.asInstanceOf[InetSocketAddress])
/* Lifecycle */
@Lifecycle
override protected def postWarmup(): Unit = {
super.postWarmup()
val baseSrv = ThriftMux.server
.withLabel(thriftServerNameFlag())
.withStatsReceiver(injector.instance[StatsReceiver].scope("srv"))
.withResponseClassifier(injector.instance[ThriftResponseClassifier])
// The inject.TwitterServer installs the TracerModule, which uses a bindOption for configuring a Tracer. This
// approach differs from the pattern used with StatsReceiver & StatsReceiverModule in order to work
// backwards compatibly with implementations that already bind their own customer Tracer.
// If a Tracer is bound to the object graph we configure it here, otherwise the default Tracer
// configuration is used. This is our primary integration point for verifying traces via test.
val srv = injector.instance[Option[Tracer]] match {
case Some(tracer) => baseSrv.withTracer(tracer)
case _ => baseSrv
}
thriftServer = build(
thriftPortFlag(),
frameworkConfigureServer(
configureThriftServer(
srv
)
)
)
onExit {
Await.result(thriftServer.close(thriftShutdownTimeoutFlag().fromNow))
}
await(thriftServer)
thriftAnnounceFlag() match {
case ThriftServerTrait.NoThriftAnnouncement => // no-op
case addr =>
info(s"thrift server announced to $addr")
thriftServer.announce(addr)
}
info(s"thrift server started on port ${PortUtils.getPort(thriftServer.boundAddress)}")
}
/* Overrides */
override def thriftPort: Option[Int] = Some(PortUtils.getPort(thriftServer))
/* Protected */
/**
* Default [[com.twitter.inject.TwitterModule]] for providing a [[ThriftResponseClassifier]].
*
* @return a [[com.twitter.inject.TwitterModule]] which provides a [[ThriftResponseClassifier]] implementation.
*/
protected def thriftResponseClassifierModule: Module = ThriftResponseClassifierModule
/**
* This method allows for further configuration of the thrift server for parameters not exposed by
* this trait or for overriding defaults provided herein, e.g.,
*
* override def configureThriftServer(server: ThriftMux.Server): ThriftMux.Server = {
* server
* .withMaxReusableBufferSize(...)
* }
*
* @param server - the [[com.twitter.finagle.ThriftMux.Server]] to configure.
* @return a configured ThriftMux.Server.
*/
protected def configureThriftServer(server: ThriftMux.Server): ThriftMux.Server = {
server
}
/* Configuration of the server reserved by the framework */
protected[finatra] def frameworkConfigureServer(server: ThriftMux.Server): ThriftMux.Server = {
server
}
/**
* Construct a [[com.twitter.finagle.ListeningServer]] from the given String addr
* and configured [[ThriftMux.Server]] stack.
*
* @param addr the [[String]] address to bind the resultant [[ListeningServer]].
* @param server the configured [[ThriftMux.Server]] stack.
* @return a constructed [[ListeningServer]].
*/
private[thrift] def build(addr: String, server: ThriftMux.Server): ListeningServer = {
server.serve(addr, this.thriftService)
}
}
/**
* A basic ThriftServer implemented by a {{{com.twitter.finagle.Service<byte[], byte[]>}}}.
*
* @note Scala users are encouraged to use [[ThriftServerTrait]] instead.
*/
abstract class AbstractThriftServerTrait extends AbstractTwitterServer with ThriftServerTrait
/**
* A Finagle server which exposes an external Thrift interface implemented by a
* {{{Service<byte[], byte[]>}}} configured via a [[ThriftRouter]]. This trait is
* intended for use from Scala or with generated Scala code.
*
* @note Java users are encouraged to use [[AbstractThriftServer]] instead.
*/
trait ThriftServer extends ThriftServerTrait {
/**
* Configuration of the `Service[Array[Byte], Array[Byte]]` to serve on the [[ListeningServer]]
* is defined by configuring the [[ThriftRouter]] and not by implementation of this method,
* thus this method overridden to be final and set to a `NilService`.
*/
final override protected def thriftService: Service[Array[Byte], Array[Byte]] =
NilService
/** Serve the `Service[Array[Byte], Array[Byte]]` from the configured [[ThriftRouter]]. */
override private[thrift] def build(addr: String, server: ThriftMux.Server): ListeningServer = {
val router = injector.instance[ThriftRouter]
server.serveIface(addr, router.thriftService)
}
/* Abstract */
/**
* Users MUST provide an implementation to configure the provided [[ThriftRouter]]. The [[ThriftRouter]]
* exposes a DSL which results in a configured Finagle `Service[-Req, +Rep]` to serve on
* the [[ListeningServer]].
*
* @param router the [[ThriftRouter]] to configure.
*/
protected def configureThrift(router: ThriftRouter): Unit
/* Lifecycle */
@Lifecycle
override protected def postInjectorStartup(): Unit = {
super.postInjectorStartup()
configureRouter()
}
/**
* Configure the appropriate router for this server.
* @note Users SHOULD NOT override/replace this method.
*/
private[thrift] def configureRouter(): Unit = {
configureThrift(injector.instance[ThriftRouter])
}
}
/**
* A Finagle server which exposes an external Thrift interface implemented by a
* `Service[Array[Byte], Array[Byte]]` configured via a [[JavaThriftRouter]]. This abstract class is
* intended for use from Java or with generated Java code.
*
* @note Scala users are encouraged to use [[ThriftServer]] instead.
*/
abstract class AbstractThriftServer extends AbstractTwitterServer with ThriftServer {
/** Serve the `Service[Array[Byte], Array[Byte]]` from the configured [[JavaThriftRouter]]. */
override private[thrift] final def build(
addr: String,
server: ThriftMux.Server
): ListeningServer = {
val router = injector.instance[JavaThriftRouter]
// Configure the service first so that serviceClazzStackParam is defined
// by the time it is checked (for Java servers configured via JavaThriftRouter)
val service = configureService(router.createService(server.serverParam))
router.getServiceClazzStackParam match {
case Some(serviceClazz) =>
server
.withServiceClass(serviceClazz)
.serve(addr, service)
case _ =>
server.serve(addr, service)
}
}
/** Users are expected to use [[configureThrift(router: JavaThriftRouter)]] */
override protected final def configureThrift(router: ThriftRouter): Unit =
throw new IllegalStateException("Use 'configureThrift(router: JavaThriftRouter)'")
/* Abstract */
/**
* Users MUST provide an implementation to configure the provided [[JavaThriftRouter]]. The [[JavaThriftRouter]]
* exposes a DSL which results in a configured Finagle `Service[-Req, +Rep]` to serve on the [[ListeningServer]].
*
* @param router the [[JavaThriftRouter]] to configure.
*/
protected def configureThrift(router: JavaThriftRouter): Unit
/* Protected */
/**
* Override to provide further configuration to the `Service[Array[Byte], Array[Byte]]` served.
* For example, to add "global" filters over the resultant `Service[Array[Byte], Array[Byte]]`.
*
* E.g.
*
* {{{
* override protected def configureService(
* service: Service[Array[Byte], Array[Byte]]
* ): Service[Array[Byte], Array[Byte]] = {
* injector.instance[MyGreatServiceFilter].andThen(service)
* }
* }}}
*/
protected def configureService(
service: Service[Array[Byte], Array[Byte]]
): Service[Array[Byte], Array[Byte]] = service
/* Lifecycle */
/**
* Configure the appropriate router for this server.
* @note Users SHOULD NOT override/replace this method.
*/
override private[thrift] final def configureRouter(): Unit = {
configureThrift(injector.instance[JavaThriftRouter])
}
}