-
Notifications
You must be signed in to change notification settings - Fork 12
/
WebSocketClient.scala
90 lines (74 loc) · 2.79 KB
/
WebSocketClient.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package sample.stream_actor
import akka.actor.{ActorRef, ActorSystem}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.ws._
import akka.stream.scaladsl.{Flow, GraphDSL, Keep, Sink, Source}
import akka.stream.{FlowShape, SourceShape}
import sample.stream_actor.WindTurbineSimulator._
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
object WebSocketClient {
def apply(id: String, endpoint: String, windTurbineSimulator: ActorRef)
(implicit
system: ActorSystem,
executionContext: ExecutionContext) = {
new WebSocketClient(id, endpoint, windTurbineSimulator)(system, executionContext)
}
}
class WebSocketClient(id: String, endpoint: String, windTurbineSimulator: ActorRef)
(implicit
system: ActorSystem,
executionContext: ExecutionContext) {
val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] = {
val websocketUri = s"$endpoint/measurements/$id"
Http().webSocketClientFlow(WebSocketRequest(websocketUri))
}
val outgoing = GraphDSL.create() { implicit builder =>
val data = WindTurbineData(id)
val flow = builder.add {
Source.tick(1.second, 100.millis,()) //valve for the WindTurbineData frequency
.map(_ => TextMessage(data.getNext))
}
SourceShape(flow.out)
}
val incoming = GraphDSL.create() { implicit builder =>
val flow = builder.add {
Flow[Message]
.collect {
case TextMessage.Strict(text) =>
Future.successful(text)
case TextMessage.Streamed(textStream) =>
textStream.runFold("")(_ + _)
.flatMap(Future.successful)
}
.mapAsync(1)(identity)
.map(each => println(s"Client received msg: $each"))
}
FlowShape(flow.in, flow.out)
}
val (upgradeResponse, closed) = Source.fromGraph(outgoing)
.viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
.via(incoming)
.toMat(Sink.ignore)(Keep.both) // also keep the Future[Done]
.run()
val connected =
upgradeResponse.map { upgrade =>
upgrade.response.status match {
case StatusCodes.SwitchingProtocols => windTurbineSimulator ! Upgraded
case statusCode => windTurbineSimulator ! FailedUpgrade(statusCode)
}
}
connected.onComplete {
case Success(_) => windTurbineSimulator ! Connected
case Failure(ex) => windTurbineSimulator ! ConnectionFailure(ex)
}
closed.map { _ =>
windTurbineSimulator ! Terminated
}
closed.onComplete {
case Success(_) => windTurbineSimulator ! Connected
case Failure(ex) => windTurbineSimulator ! ConnectionFailure(ex)
}
}