/
SerialPortActor.scala
113 lines (96 loc) · 3.3 KB
/
SerialPortActor.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package framboos.actor
import akka.actor._
import framboos._
import scala.concurrent._
import scala.async.Async.{ async, await }
import ExecutionContext.Implicits.global
import purejavacomm._
import java.io._
import CommonMessages._
object SerialPortActor {
def props(portName: String): Props = Props(new SerialPortActor(portName))
/** Message to be sent over serial connection */
case class SendMessage(message: String) extends Incoming
/** Message received over serial connection */
case class ReceiveMessage(message: String) extends Outgoing
}
class SerialPortActor(portName: String) extends Actor with ActorLogging {
import SerialPortActor._
var listeners = Set.empty[ActorRef]
def receive = connecting
connect
def connect = async {
findPort(portName) match {
case Some(port) => {
log.info(s"Port found: $portName")
val in = new BufferedReader(new InputStreamReader(port.getInputStream))
port.addEventListener(new SerialPortEventListener {
def serialEvent(event: SerialPortEvent) {
if (event.getEventType == SerialPortEvent.DATA_AVAILABLE) {
log.debug("New serial input")
while (in.ready) {
val nextLine = in.readLine
log.debug(s"Receiving message: $nextLine")
listeners foreach { _ ! ReceiveMessage(nextLine) }
}
}
}
})
val out = new BufferedWriter(new OutputStreamWriter(port.getOutputStream))
out.write("Hello from SerialPortActor\n")
out.flush
context.become(connected(in, out), true)
}
case None => {
log.error(s"Could not find port $portName")
}
}
}
def findPort(portName: String): Option[SerialPort] = {
import java.util._
def findPort0(ports: Enumeration[CommPortIdentifier]): Option[SerialPort] = {
if (ports.hasMoreElements) {
val nextPortId: CommPortIdentifier = ports.nextElement
if (nextPortId.getName().equalsIgnoreCase(portName)) {
Some(openPort(nextPortId))
} else {
findPort0(ports)
}
} else {
None
}
}
findPort0(CommPortIdentifier.getPortIdentifiers.asInstanceOf[Enumeration[CommPortIdentifier]])
}
def openPort(portId: CommPortIdentifier): SerialPort = {
val port: SerialPort = portId.open("SerialPortActor", 1000).asInstanceOf[SerialPort]
port.notifyOnDataAvailable(true)
port.setFlowControlMode(SerialPort.FLOWCONTROL_XONXOFF_IN + SerialPort.FLOWCONTROL_XONXOFF_OUT)
port
}
import SerialPortActor._
val connecting: Receive = {
case AddListener(listener: ActorRef) => {
listeners = listeners + listener
}
case RemoveListener(listener: ActorRef) => {
listeners = listeners - listener
}
case SendMessage(message: String) => async {
log.warning(s"Not connected, could not deliver message: $message")
}
}
def connected(in: BufferedReader, out: BufferedWriter): Receive = {
case AddListener(listener: ActorRef) => {
listeners = listeners + listener
}
case RemoveListener(listener: ActorRef) => {
listeners = listeners - listener
}
case SendMessage(message: String) => async {
log.debug(s"Sending message: $message")
out.write(message + '\n')
out.flush
}
}
}