forked from ryantanner/websocketchat-redis
-
Notifications
You must be signed in to change notification settings - Fork 1
/
RedisService.scala
148 lines (127 loc) · 4.2 KB
/
RedisService.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package models
import java.io.OutputStream
import com.redis.RedisClient
import com.redis.RedisClientPool
import com.redis.{ PubSubMessage, S, U, M, E}
import play.api.Play
import play.api.Play.current
import play.api.Logger
import play.api.libs.iteratee.Iteratee
import play.api.libs.iteratee.Concurrent
import play.api.libs.concurrent.Akka
import akka.actor.Props
import akka.actor.Actor
import akka.actor.PoisonPill
import play.api.libs.concurrent.Execution.Implicits.defaultContext
class RedisService(redisUrl: String) {
private val (host, port, pool) = {
val uri = new java.net.URI(redisUrl)
val host = uri.getHost
val port = uri.getPort
val secret = uri.getUserInfo.split(":").toList match {
case username :: password :: Nil => Some(password)
case _ => None
}
val pool = new RedisClientPool(host, port, secret=secret)
Logger.info(s"Redis host: $host, Redis port: $port")
(host, port, pool)
}
def withClient[T](body: RedisClient => T) = pool.withClient(body)
def borrowClient = pool.pool.borrowObject
def returnClient(client: RedisClient) = pool.pool.returnObject(client)
def close = pool.close
def createPubSub(channel: String,
send: String => String = null,
receive: String => String = null,
disconnect: => String = null,
exception: Throwable => Unit = defaultException,
subscribe: (String, Int) => Unit = defaultSubscribe,
unsubscribe: (String, Int) => Unit = defaultUnsubscribe
) = new PubSubChannel(this, channel,
Option(send), Option(receive), Option(() => disconnect),
Option(exception), Option(subscribe), Option(unsubscribe)
)
private def defaultException: Throwable => Unit = { ex =>
Logger.error("Subscriber error", ex)
}
private def defaultSubscribe: (String, Int) => Unit = { (c, n) =>
Logger.info("subscribed to " + c + " and count = " + n)
}
private def defaultUnsubscribe: (String, Int) => Unit = { (c, n) =>
Logger.info("unsubscribed from " + c + " and count = " + n)
}
}
object RedisService {
def apply(uri: String) = new RedisService(uri)
}
class PubSubChannel(redis: RedisService, channel: String,
send: Option[String => String] = None,
receive: Option[String => String] = None,
disconnect: Option[() => String] = None,
exception: Option[Throwable => Unit] = None,
subscribe: Option[(String, Int) => Unit] = None,
unsubscribe: Option[(String, Int) => Unit] = None
) {
@volatile
private var unsubscribed = false
private val (msgEnumerator, msgChannel) = Concurrent.broadcast[String]
private val pub = Akka.system.actorOf(Props(new Publisher(redis)))
private val sub = {
val client = redis.borrowClient
client.subscribe(channel)(callback)
client
}
lazy val in = Iteratee.foreach[String] { msg =>
val str = send.map(_(msg)).getOrElse(msg)
send(str)
}.map { _ =>
disconnect.foreach { f =>
val msg = f()
send(msg)
}
close
}
lazy val out = msgEnumerator
private def callback(pubsub: PubSubMessage): Unit = pubsub match {
case E(ex) =>
exception.foreach(_(ex))
case S(channel, no) =>
subscribe.foreach(_(channel, no))
case U(channel, no) =>
unsubscribe.foreach(_(channel, no))
if (no == 0) {
sub.pubSub = false
}
if (!unsubscribed) {
unsubscribed = true
redis.returnClient(sub)
}
case M(channel, msg) =>
Logger.debug("receive: " + msg)
val str = receive.map(_(msg)).getOrElse(msg)
msgChannel.push(str)
}
def send(msg: String) = {
Logger.debug("send: " + msg)
pub ! Publish(channel, msg)
}
def send(channel: String, msg: String) = {
Logger.debug("send: " + msg)
pub ! Publish(channel, msg)
}
def close = {
Logger.info("close: " + channel)
if (!unsubscribed) {
sub.unsubscribe
}
pub ! PoisonPill
}
}
case class Publish(channel: String, message: String)
class Publisher(redis: RedisService) extends Actor {
def receive = {
case Publish(c, m) =>
val ret = redis.withClient { _.publish(c, m)}
sender ! ret
}
}