Skip to content

Commit 78d0667

Browse files
committed
flushing outstream take 2
1 parent 8d69d92 commit 78d0667

File tree

1 file changed

+22
-16
lines changed

1 file changed

+22
-16
lines changed

src/main/scala/higherkindness/rules_scala/common/worker/WorkerMain.scala

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,18 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
108108
// close them after the async work in the Future is all done.
109109
// If we do something synchronous with Using, then there's a race condition where the
110110
// streams can get closed before the Future is completed.
111-
var outStream: ByteArrayOutputStream = null
112-
var out: PrintStream = null
111+
var maybeOutStream: Option[ByteArrayOutputStream] = None
112+
var maybeOut: Option[PrintStream] = None
113+
114+
def flushOut(): Unit = {
115+
maybeOut.map(_.flush())
116+
}
113117

114118
val workTask = CancellableTask {
115-
outStream = new ByteArrayOutputStream
116-
out = new PrintStream(outStream)
119+
val outStream = new ByteArrayOutputStream()
120+
val out = new PrintStream(outStream)
121+
maybeOutStream = Some(outStream)
122+
maybeOut = Some(out)
117123
try {
118124
work(ctx, args, out, sandboxDir, verbosity)
119125
0
@@ -128,15 +134,15 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
128134
.andThen {
129135
// Work task succeeded or failed in an expected way
130136
case Success(code) =>
131-
out.flush()
132-
writeResponse(requestId, Some(outStream), Some(code))
137+
flushOut()
138+
writeResponse(requestId, maybeOutStream, Some(code))
133139
System.err.println(s"WorkResponse $requestId sent with code $code")
134140

135141
case Failure(e: ExecutionException) =>
136142
e.getCause() match {
137143
// Task successfully cancelled
138144
case cancelError: InterruptedException =>
139-
out.flush()
145+
flushOut()
140146
writeResponse(requestId, None, None, wasCancelled = true)
141147
System.err.println(
142148
s"Cancellation WorkResponse sent for request id: $requestId in response to an" +
@@ -145,9 +151,9 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
145151

146152
// Work task threw a non-fatal error
147153
case e =>
148-
e.printStackTrace(out)
149-
out.flush()
150-
writeResponse(requestId, Some(outStream), Some(-1))
154+
maybeOut.map(e.printStackTrace(_))
155+
flushOut()
156+
writeResponse(requestId, maybeOutStream, Some(-1))
151157
System.err.println(
152158
"Encountered an uncaught exception that was wrapped in an ExecutionException while" +
153159
s" proccessing the Future for WorkRequest $requestId. This usually means a non-fatal" +
@@ -158,7 +164,7 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
158164

159165
// Task successfully cancelled
160166
case Failure(e: CancellationException) =>
161-
out.flush()
167+
flushOut()
162168
writeResponse(requestId, None, None, wasCancelled = true)
163169
System.err.println(
164170
s"Cancellation WorkResponse sent for request id: $requestId in response to a" +
@@ -167,15 +173,15 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
167173

168174
// Work task threw an uncaught exception
169175
case Failure(e) =>
170-
e.printStackTrace(out)
171-
out.flush()
172-
writeResponse(requestId, Some(outStream), Some(-1))
176+
maybeOut.map(e.printStackTrace(_))
177+
flushOut()
178+
writeResponse(requestId, maybeOutStream, Some(-1))
173179
System.err.println(s"Uncaught exception in Future while proccessing WorkRequest $requestId:")
174180
e.printStackTrace(System.err)
175181
}(scala.concurrent.ExecutionContext.global)
176182
.andThen { case _ =>
177-
out.close()
178-
outStream.close()
183+
maybeOut.map(_.close())
184+
maybeOutStream.map(_.close())
179185
}(scala.concurrent.ExecutionContext.global)
180186

181187
// putIfAbsent will return a non-null value if there was already a value in the map

0 commit comments

Comments
 (0)