This repository has been archived by the owner on Apr 24, 2024. It is now read-only.
/
DemoService.scala
188 lines (167 loc) · 6.62 KB
/
DemoService.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package spray.examples
import java.io.File
import org.parboiled.common.FileUtils
import scala.concurrent.duration._
import akka.actor.{Props, Actor}
import akka.pattern.ask
import spray.routing.{HttpService, RequestContext}
import spray.routing.directives.CachingDirectives
import spray.can.server.Stats
import spray.can.Http
import spray.httpx.marshalling.Marshaller
import spray.httpx.encoding.Gzip
import spray.util._
import spray.http._
import MediaTypes._
import CachingDirectives._
// we don't implement our route structure directly in the service actor because
// we want to be able to test it independently, without having to spin up an actor
class DemoServiceActor extends Actor with DemoService {
// the HttpService trait defines only one abstract member, which
// connects the services environment to the enclosing actor or test
def actorRefFactory = context
// this actor only runs our route, but you could add
// other things here, like request stream processing,
// timeout handling or alternative handler registration
def receive = runRoute(demoRoute)
}
// this trait defines our service behavior independently from the service actor
trait DemoService extends HttpService {
// we use the enclosing ActorContext's or ActorSystem's dispatcher for our Futures and Scheduler
implicit def executionContext = actorRefFactory.dispatcher
val demoRoute = {
get {
path("") {
complete(index)
} ~
path("ping") {
complete("PONG!")
} ~
path("stream1") {
// we detach in order to move the blocking code inside the simpleStringStream into a future
detach() {
complete(simpleStringStream)
}
} ~
path("stream2") {
sendStreamingResponse
} ~
path("stream-large-file") {
encodeResponse(Gzip) {
getFromFile(largeTempFile)
}
} ~
path("stats") {
complete {
actorRefFactory.actorFor("/user/IO-HTTP/listener-0")
.ask(Http.GetStats)(1.second)
.mapTo[Stats]
}
} ~
path("timeout") { ctx =>
// we simply let the request drop to provoke a timeout
} ~
path("cached") {
cache(simpleRouteCache) { ctx =>
in(1500.millis) {
ctx.complete("This resource is only slow the first time!\n" +
"It was produced on " + DateTime.now.toIsoDateTimeString + "\n\n" +
"(Note that your browser will likely enforce a cache invalidation with a\n" +
"`Cache-Control: max-age=0` header when you click 'reload', so you might need to `curl` this\n" +
"resource in order to be able to see the cache effect!)")
}
}
} ~
path("crash") { ctx =>
sys.error("crash boom bang")
} ~
path("fail") {
failWith(new RuntimeException("aaaahhh"))
}
} ~
(post | parameter('method ! "post")) {
path("stop") {
complete {
in(1.second){ actorSystem.shutdown() }
"Shutting down in 1 second..."
}
}
}
}
lazy val simpleRouteCache = routeCache()
lazy val index =
<html>
<body>
<h1>Say hello to <i>spray-routing</i> on <i>spray-can</i>!</h1>
<p>Defined resources:</p>
<ul>
<li><a href="/ping">/ping</a></li>
<li><a href="/stream1">/stream1</a> (via a Stream[T])</li>
<li><a href="/stream2">/stream2</a> (manually)</li>
<li><a href="/stream-large-file">/stream-large-file</a></li>
<li><a href="/stats">/stats</a></li>
<li><a href="/timeout">/timeout</a></li>
<li><a href="/cached">/cached</a></li>
<li><a href="/crash">/crash</a></li>
<li><a href="/fail">/fail</a></li>
<li><a href="/stop?method=post">/stop</a></li>
</ul>
</body>
</html>
// we prepend 2048 "empty" bytes to push the browser to immediately start displaying the incoming chunks
lazy val streamStart = " " * 2048 + "<html><body><h2>A streaming response</h2><p>(for 15 seconds)<ul>"
lazy val streamEnd = "</ul><p>Finished.</p></body></html>"
def simpleStringStream: Stream[String] = {
val secondStream = Stream.continually {
// CAUTION: we block here to delay the stream generation for you to be able to follow it in your browser,
// this is only done for the purpose of this demo, blocking in actor code should otherwise be avoided
Thread.sleep(500)
"<li>" + DateTime.now.toIsoDateTimeString + "</li>"
}
streamStart #:: secondStream.take(15) #::: streamEnd #:: Stream.empty
}
// simple case class whose instances we use as send confirmation message for streaming chunks
case class Ok(remaining: Int)
def sendStreamingResponse(ctx: RequestContext): Unit =
actorRefFactory.actorOf {
Props {
new Actor with SprayActorLogging {
// we use the successful sending of a chunk as trigger for scheduling the next chunk
val responseStart = HttpResponse(entity = HttpEntity(`text/html`, streamStart))
ctx.responder ! ChunkedResponseStart(responseStart).withAck(Ok(16))
def receive = {
case Ok(0) =>
ctx.responder ! MessageChunk(streamEnd)
ctx.responder ! ChunkedMessageEnd
context.stop(self)
case Ok(remaining) =>
in(500.millis) {
val nextChunk = MessageChunk("<li>" + DateTime.now.toIsoDateTimeString + "</li>")
ctx.responder ! nextChunk.withAck(Ok(remaining - 1))
}
case ev: Http.ConnectionClosed =>
log.warning("Stopping response streaming due to {}", ev)
}
}
}
}
implicit val statsMarshaller: Marshaller[Stats] =
Marshaller.delegate[Stats, String](ContentTypes.`text/plain`) { stats =>
"Uptime : " + stats.uptime.formatHMS + '\n' +
"Total requests : " + stats.totalRequests + '\n' +
"Open requests : " + stats.openRequests + '\n' +
"Max open requests : " + stats.maxOpenRequests + '\n' +
"Total connections : " + stats.totalConnections + '\n' +
"Open connections : " + stats.openConnections + '\n' +
"Max open connections : " + stats.maxOpenConnections + '\n' +
"Requests timed out : " + stats.requestTimeouts + '\n'
}
lazy val largeTempFile: File = {
val file = File.createTempFile("streamingTest", ".txt")
FileUtils.writeAllText((1 to 1000) map ("This is line " + _) mkString "\n", file)
file.deleteOnExit()
file
}
def in[U](duration: FiniteDuration)(body: => U): Unit =
actorSystem.scheduler.scheduleOnce(duration)(body)
}