/
App.scala
266 lines (229 loc) · 8.52 KB
/
App.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
package com.twitter.app
import com.twitter.conversions.time._
import com.twitter.util._
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicReference
import java.util.logging.Logger
import scala.collection.JavaConverters._
import scala.collection.mutable
/**
* A composable application trait that includes flag parsing as well
* as basic application lifecycle (pre- and post- main). Flag parsing
* is done via [[com.twitter.app.Flags]], an instance of which is
* defined in the member `flag`. Applications should be constructed
* with modularity in mind, and common functionality should be
* extracted into mixins.
*
* Flags should only be constructed in the constructor, and should only be read
* in the premain or later, after they have been parsed.
*
* {{{
* object MyApp extends App {
* val n = flag("n", 100, "Number of items to process")
*
* def main() {
* for (i <- 0 until n()) process(i)
* }
* }
* }}}
*
* Note that a missing `main` is OK: mixins may provide behavior that
* does not require defining a custom `main` method.
*/
trait App extends Closable with CloseAwaitably {
/** The name of the application, based on the classname */
val name: String = getClass.getName.stripSuffix("$")
/** The [[com.twitter.app.Flags]] instance associated with this application */
//failfastOnFlagsNotParsed is called in the ctor of App.scala here which is a bad idea
//as things like this can happen http://stackoverflow.com/questions/18138397/calling-method-from-constructor
val flag: Flags = new Flags(name, includeGlobal = true, failfastOnFlagsNotParsed)
private var _args = Array[String]()
/** The remaining, unparsed arguments */
def args: Array[String] = _args
/** Whether or not to accept undefined flags */
protected def allowUndefinedFlags: Boolean = false
/**
* Users of this code should override this to `true` so that
* you fail-fast instead of being surprised at runtime by code that
* is reading from flags before they have been parsed.
*
* Ideally this would default to `true`, however, in order to avoid
* breaking existing users, it was introduced using `false`.
*/
protected def failfastOnFlagsNotParsed: Boolean = false
protected def exitOnError(reason: String): Unit = {
System.err.println(reason)
close()
System.exit(1)
}
private val inits: mutable.Buffer[() => Unit] = mutable.Buffer.empty
private val premains: mutable.Buffer[() => Unit] = mutable.Buffer.empty
private val exits: ConcurrentLinkedQueue[Closable] = new ConcurrentLinkedQueue
private val lastExits: ConcurrentLinkedQueue[Closable] = new ConcurrentLinkedQueue
private val postmains: ConcurrentLinkedQueue[() => Unit] = new ConcurrentLinkedQueue
// finagle isn't available here, so no DefaultTimer
protected lazy val shutdownTimer: Timer = new JavaTimer(isDaemon = true)
/**
* Invoke `f` before anything else (including flag parsing).
*/
protected final def init(f: => Unit): Unit = {
inits += (() => f)
}
/**
* Invoke `f` right before the user's main is invoked.
*/
protected final def premain(f: => Unit): Unit = {
premains += (() => f)
}
/** Minimum duration to allow for exits to be processed. */
final val MinGrace: Duration = 1.second
/**
* Default amount of time to wait for shutdown.
* This value is not used as a default if `close()` is called without parameters. It simply
* provides a default value to be passed as `close(grace)`.
*/
def defaultCloseGracePeriod: Duration = Duration.Zero
/**
* The actual close grace period.
*/
@volatile private[this] var closeDeadline = Time.Top
/**
* This is satisfied when all members of `exits` and `lastExits` have closed.
*
* @note Access needs to be mediated via the intrinsic lock.
*/
private[this] var closing: Future[Unit] = Future.never
/**
* Close `closable` when shutdown is requested. Closables are closed in parallel.
*/
final def closeOnExit(closable: Closable): Unit = synchronized {
if (closing == Future.never) {
// `close()` not yet called, safe to add it
exits.add(closable)
} else {
// `close()` already called, we need to close this here, immediately.
closable.close(closeDeadline)
}
}
/**
* Register a `closable` to be closed on application shutdown after those registered
* via `closeOnExit`.
*
* @note Application shutdown occurs in two sequential phases to allow explicit
* encoding of resource lifecycle relationships. Concretely this is useful
* for encoding that a monitoring resource should outlive a monitored
* resource.
*
* In all cases, the close deadline is enforced.
*/
final def closeOnExitLast(closable: Closable): Unit = synchronized {
if (closing == Future.never) {
// `close()` not yet called, safe to add it
lastExits.add(closable)
} else {
// `close()` already called, we need to close this here, but only
// after `close()` completes and `closing` is satisfied
closing
.transform { _ =>
closable.close(closeDeadline)
}
.by(shutdownTimer, closeDeadline)
}
}
/**
* Invoke `f` when shutdown is requested. Exit hooks run in parallel and are
* executed after all postmains complete. The thread resumes when all exit
* hooks complete or `closeDeadline` expires.
*/
protected final def onExit(f: => Unit): Unit = {
closeOnExit {
Closable.make { deadline => // close() ensures that this deadline is sane
FuturePool.unboundedPool(f).by(shutdownTimer, deadline)
}
}
}
/**
* Invoke `f` after the user's main has exited.
*/
protected final def postmain(f: => Unit): Unit = {
postmains.add(() => f)
}
/**
* Notify the application that it may stop running.
* Returns a Future that is satisfied when the App has been torn down or errors at the deadline.
*/
final def close(deadline: Time): Future[Unit] = synchronized {
closing = closeAwaitably {
closeDeadline = deadline.max(Time.now + MinGrace)
val firstPhase = Closable
.all(exits.asScala.toSeq: _*)
.close(closeDeadline)
.by(shutdownTimer, closeDeadline)
firstPhase
.transform { _ =>
Closable.all(lastExits.asScala.toSeq: _*).close(closeDeadline)
}
.by(shutdownTimer, closeDeadline)
}
closing
}
final def main(args: Array[String]): Unit = {
try {
nonExitingMain(args)
} catch {
case FlagUsageError(reason) =>
exitOnError(reason)
case FlagParseException(reason, _) =>
exitOnError(reason)
case e: Throwable =>
e.printStackTrace()
exitOnError("Exception thrown in main on startup")
}
}
final def nonExitingMain(args: Array[String]): Unit = {
App.register(this)
for (f <- inits) f()
flag.parseArgs(args, allowUndefinedFlags) match {
case Flags.Ok(remainder) =>
_args = remainder.toArray
case Flags.Help(usage) =>
throw FlagUsageError(usage)
case Flags.Error(reason) =>
throw FlagParseException(reason)
}
for (f <- premains) f()
// Get a main() if it's defined. It's possible to define traits that only use pre/post mains.
val mainMethod =
try Some(getClass.getMethod("main"))
catch { case _: NoSuchMethodException => None }
// Invoke main() if it exists.
mainMethod foreach { method =>
try method.invoke(this)
catch { case e: InvocationTargetException => throw e.getCause }
}
for (f <- postmains.asScala) f()
// We discard this future but we `Await.result` on `this` which is a
// `CloseAwaitable`, and this means the thread waits for the future to
// complete.
close(defaultCloseGracePeriod)
// The deadline to 'close' is advisory; we enforce it here.
Await.result(this, closeDeadline - Time.now)
}
}
object App {
private[this] val log = Logger.getLogger(getClass.getName)
private[this] val ref = new AtomicReference[Option[App]](None)
/**
* The currently registered App, if any. While the expectation is that there
* will be a single running App per process, the most-recently registered
* App will be returned in the event that more than one exists.
*/
def registered: Option[App] = ref.get
private[app] def register(app: App): Unit =
ref.getAndSet(Some(app)).foreach { existing =>
log.warning(
s"Multiple com.twitter.app.App main methods called. ${existing.name}, then ${app.name}"
)
}
}